package com.zhao.apitest.window;

import com.zhao.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

/**
 * @author xiaoZhao
 * @date 2022/5/25
 * @describe
 */
public class WindowTest3_EventTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置全局事件时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(100);


        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);

        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        })
                // 升序数据设置事件时间和watermark
                // 无需设置延时时间 把当前的getTimestamp提取出当成当前的事件时间
                //.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
                //    @Override
                //    public long extractAscendingTimestamp(SensorReading element) {
                //        return element.getTimestamp() * 1000;
                //    }
                //})

                // 乱序数据设置事件时间和watermark
                // 需要设置延时时间
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000;
                    }
                });


        OutputTag<SensorReading> tag = new OutputTag<>("late");
        // 基于事件时间的开窗聚合 统计15秒内温度的最小值
        SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                // 设置窗口延时
                .allowedLateness(Time.seconds(1))
                // 将窗口关闭后的数据输出到侧流
                .sideOutputLateData(tag)
                .minBy("temperature");

        minTempStream.print("minTemp");
        minTempStream.getSideOutput(tag).print("late");
        env.execute();


    }
}
